package com.parser.sixoo;

import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import lombok.Setter;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.io.File;
import java.io.FileInputStream;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class RedisStoreMain {

    /**
     * 日志记录
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisStoreMain.class);
    private static Properties props = new Properties();

    public static final String REDIS_CLUSTER_NODE_PORT = "cluster.node.port";
    public static final String REDIS_MAX_TOTAL = "max.total";
    public static final String REDIS_MAX_IDLE = "max.idle";
    public static final String REDIS_MAX_WAIT_MILLIS = "max.wait.millis";
    public static final String REDIS_AUTH_CODE = "auth.code";

    public static void loadProp(String name) throws Exception {
        String CONF_DIR = System.getProperty("PROJECT_CONF_DIR");
        try {
            if (CONF_DIR == null || CONF_DIR.isEmpty()) {
                props.load(RedisStoreMain.class.getClassLoader().getResourceAsStream(name));
            } else {
                String path = CONF_DIR + File.separator + name;
                props.load(new FileInputStream(path));
            }
        } catch (Exception e) {
            throw new Exception("加载" + name + "配置文件失败！conf_dir=" + CONF_DIR, e);
        }
    }

    public static Jedis createJedisCluterInstance() {
        String[] hostPortStr = String.valueOf(props.get(REDIS_CLUSTER_NODE_PORT)).split(":");
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setMaxTotal(Integer.valueOf(String.valueOf(props.get(REDIS_MAX_TOTAL))));
        config.setMaxIdle(Integer.valueOf(String.valueOf(props.get(REDIS_MAX_IDLE))));
        config.setMaxWaitMillis(Integer.valueOf(String.valueOf(props.get(REDIS_MAX_WAIT_MILLIS))));

        JedisPool jedisPool = new JedisPool(config, hostPortStr[0], Integer.valueOf(hostPortStr[1]));

        final Jedis[] jedis = {null};

        Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
                .retryIfException()
                .withWaitStrategy(WaitStrategies.fixedWait(1000, TimeUnit.MILLISECONDS))
                .withStopStrategy(StopStrategies.stopAfterAttempt(10))
                .build();
        try {
            retryer.call(() -> {
                jedis[0] = jedisPool.getResource();
                jedis[0].auth(String.valueOf(props.get(REDIS_AUTH_CODE)));
                String value = jedis[0].ping();
                return StringUtils.isNotBlank(value);
            });
        } catch (Exception e) {
            LOGGER.error("多次获取Redis连接失败！");
        }
        return jedis[0];
    }

    public static void main2(String[] args) throws Exception {

        /**
         * 初始化map
         */
        loadProp("redis.properties");
        Jedis jedis = createJedisCluterInstance();
        List<Map<String, Double>> list = new ArrayList<>();
        for (int i = 0; i < 30000; i++) {
            list.add(new HashMap(2000));
        }
        Map<String, Double> map = null;

        /**
         * 数据模拟
         */
        long start = System.currentTimeMillis();
        for (int k = 1; k <= 10000; k++) {
            long time = start + k;
            for (int i = 0; i < 30000; i++) {
                map = list.get(i);
                map.put(i * 1d + ":" + time, time * 1d);
            }
            if (k % 2000 == 0) {
                long s2 = System.currentTimeMillis();
                for (int m = 0; m < list.size(); m++) {
                    try {
                        jedis.zadd("p" + m, list.get(m));
                    } catch (Exception e) {
                        jedis = createJedisCluterInstance();
                        jedis.zadd("p" + m, list.get(m));
                    }
                    list.get(m).clear();
                    System.out.println("m=" + m);
                }
                long e2 = System.currentTimeMillis();
                System.out.println("数据存储耗时：" + (e2 - s2));
            }
//            System.out.println("j=" + k/5000);
//            System.out.println("j=" + j + ",map的个数：" + map.size());
        }
        long end = System.currentTimeMillis();
        System.out.println("数据[模拟+组装]耗时：" + (end - start) / 1000);

        /**
         * 数据组装
         */
//        long s1 = System.currentTimeMillis();
//        int k = 0;
//        while (kvs.size() != 0) {
//            Model611 model = kvs.take();
//            Double key = model.getScore();
//            List<String> mem = model.getMembers();
//            for (int i = 0; i < mem.size(); i++) {
//                map = list.get(i);
//                map.put(mem.get(i), key);
//            }
//            System.out.println("k=" + k++ + "," + map.size());
//        }
//        long e1 = System.currentTimeMillis();
//
//        System.out.println("组装耗时：" + (e1 - s1));

        /**
         * 数据存储
         */
//        long s2 = System.currentTimeMillis();
//        for (int i = 0; i < list.size(); i++) {
//            jedis.zadd("p" + i, list.get(i));
//            System.out.println("i=" + i);
//        }
//        long e2 = System.currentTimeMillis();
//        System.out.println("数据存储耗时：" + (e2 - s2));


    }

    public static void main(String[] args) throws Exception {
        loadProp("redis.properties");
        Jedis jedis = createJedisCluterInstance();
//        for (int i = 0; i < 30000; i++) {
//            jedis.del("1:" + (1542955116263l+1));
//        }
//        System.out.println("del key结束");
        int thread = 60;
        int step = 60000 / thread;
        for (int i = 1; i <= thread; i++) {
            new RedisStoreMain().new integration((i - 1) * step, i * step).start();
        }
        for (int i = 0; i < 1000; i++) {
            new RedisStoreMain().new Store().start();
        }


    }


    private static final LinkedBlockingQueue<Map<String, Map<String, Double>>> queue = new LinkedBlockingQueue<>(30000);

    /**
     * 数据组装
     */
    @Setter
    class integration extends Thread {
        private int start;
        private int end;

        private integration(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        public void run() {
            long s1 = System.currentTimeMillis();
            System.out.println(Thread.currentThread() + "开始时间：" + s1);
            for (int i = 0; i < 30000; i++) {
                String key = "pp" + i;
                Map<String, Double> sm = new HashMap();
                Map<String, Map<String, Double>> sm1 = new HashMap();
                for (int j = start; j < end; j++) {
                    long score = s1 + start + j;
                    sm.put(1 + ":" + score, score * 1d);
                }
                sm1.put(key, sm);
                boolean b_in = false;
                while (!b_in){
                    try {
                        queue.offer(sm1,1000,TimeUnit.MICROSECONDS);
                        b_in = true;
                    }catch (Exception e){
                        System.out.println("重新加入队列");
                    }

                }

                System.out.println(Thread.currentThread() + "," + i + "," + (System.currentTimeMillis() - s1));
            }
            System.out.println(Thread.currentThread() + "结束时间：" + System.currentTimeMillis() + ",queue大小：" + queue.size());
        }
    }

    class Store extends Thread {
        @Override
        public void run() {
            Jedis jedis = createJedisCluterInstance();
            while (true) {
                long s1 = System.currentTimeMillis();
                try {
                    Map<String, Map<String, Double>> zset = queue.poll(1000,TimeUnit.MICROSECONDS);
                    for (Map.Entry<String, Map<String, Double>> entry : zset.entrySet())
                        try {
                            jedis.zadd(entry.getKey(), entry.getValue());
                        } catch (Exception e) {
                            jedis.zadd(entry.getKey(), entry.getValue());
                        }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                long e1 = System.currentTimeMillis();
                System.out.println(Thread.currentThread()+"存储耗时：" + (e1 - s1)+",queue="+queue.size());
            }
        }
    }

}
